Threading pt 1

* a1 now has a seperate, continuous polling thread
  this will enable operations like database cleanup (based on ACKs) and external notifications in real time,
  rather than when the API is invoked
* all rmr send and receive operations are now in this thread
* introduces a thread safe job queue between the two threads
* Not done yet: database cleanups in the thread
* Bump rmr python version

Change-Id: I29ca7843ffe7497c84891920f8aee332ac676591
Signed-off-by: Tommy Carpenter <tc677g@att.com>
diff --git a/a1/a1rmr.py b/a1/a1rmr.py
index 51b5694..d6114bf 100644
--- a/a1/a1rmr.py
+++ b/a1/a1rmr.py
@@ -15,40 +15,46 @@
 #   limitations under the License.
 # ==================================================================================
 import os
-import gevent
+import queue
+import time
+import json
 from rmr import rmr, helpers
 from a1 import get_module_logger
+from a1 import data
+from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 logger = get_module_logger(__name__)
 
 
 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
-MRC = None
+
+_SEND_QUEUE = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
 
-def init_rmr():
+def _init_rmr():
     """
-    called from run; not called for unit tests
+    init an rmr context
+    This gets monkeypatched out for unit testing
     """
-    global MRC
     # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
     # internal ring of messages, and receive calls read from that
     # currently the size is 2048 messages, so this is fine for the foreseeable future
-    MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+    mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
 
-    while rmr.rmr_ready(MRC) == 0:
-        gevent.sleep(1)
-        logger.debug("not yet ready")
+    while rmr.rmr_ready(mrc) == 0:
+        time.sleep(0.5)
+
+    return mrc
 
 
-def send(payload, message_type=0):
+def _send(mrc, payload, message_type=0):
     """
     Sends a message up to RETRY_TIMES
     If the message is sent successfully, it returns the transactionid
     Does nothing otherwise
     """
-    # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
-    sbuf = rmr.rmr_alloc_msg(MRC, 4096)
+    # TODO: investigate moving this below and allocating the space based on the payload size
+    sbuf = rmr.rmr_alloc_msg(mrc, 4096)
     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
 
     # retry RETRY_TIMES to send the message
@@ -63,7 +69,7 @@
         transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
 
         # send
-        sbuf = rmr.rmr_send_msg(MRC, sbuf)
+        sbuf = rmr.rmr_send_msg(mrc, sbuf)
         post_send_summary = rmr.message_summary(sbuf)
         logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
 
@@ -80,8 +86,85 @@
     return None
 
 
-def dequeue_all_waiting_messages(filter_type=[]):
+def _update_all_statuses(mrc):
     """
-    dequeue all waiting rmr messages from rmr
+    get all waiting messages, and try to parse them as status updates
+    (currently, those are the only messages a1 should get, this may have to be revisited later)
     """
-    return helpers.rmr_rcvall_msgs(MRC, filter_type)
+    for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
+        try:
+            pay = json.loads(msg["payload"])
+            data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
+        except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
+            logger.debug("Dropping malformed or non applicable message")
+            logger.debug(msg)
+
+
+# Public
+
+
+def queue_work(item):
+    """
+    push an item into the work queue
+    currently the only type of work is to send out messages
+    """
+    _SEND_QUEUE.put(item)
+
+
+class RmrLoop:
+    """
+    class represents an rmr loop meant to be called as a longstanding separate thread
+    """
+
+    def __init__(self, real_init=True):
+        self._rmr_is_ready = False
+        self._keep_going = True
+        self._real_init = real_init  # useful for unit testing to turn off initialization
+
+    def rmr_is_ready(self):
+        """returns whether rmr has been initialized"""
+        return self._rmr_is_ready
+
+    def stop(self):
+        """sets a flag for the loop to end"""
+        self._keep_going = False
+
+    def loop(self):
+        """
+        This loop runs in an a1 thread forever, and has 3 jobs:
+        - send out any messages that have to go out (create instance, delete instance)
+        - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+        - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+        """
+
+        # get a context
+        mrc = None
+        logger.debug("Waiting for rmr to initialize...")
+        if self._real_init:
+            mrc = _init_rmr()
+        self._rmr_is_ready = True
+        logger.debug("Rmr is ready")
+
+        # loop forever
+        logger.debug("Work loop starting")
+        while self._keep_going:
+            """
+            We never raise an exception here. Log and keep moving
+            Bugs will eventually be caught be examining logs.
+            """
+            try:
+                # First, send out all messages waiting for us
+                while not _SEND_QUEUE.empty():
+                    work_item = _SEND_QUEUE.get(block=False, timeout=None)
+                    _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
+
+                # Next, update all statuses waiting in a1s mailbox
+                _update_all_statuses(mrc)
+
+                # TODO: next body of work is to try to clean up the database for any updated statuses
+
+            except Exception as e:
+                logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
+                logger.exception(e)
+
+            time.sleep(1)
diff --git a/a1/controller.py b/a1/controller.py
index b0ae31a..b3ce88e 100644
--- a/a1/controller.py
+++ b/a1/controller.py
@@ -35,11 +35,9 @@
     """
     try:
         return func()
-    except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType) as exc:
-        logger.exception(exc)
+    except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType):
         return "", 400
-    except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound) as exc:
-        logger.exception(exc)
+    except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound):
         return "", 404
     except BaseException as exc:
         # catch all, should never happen...
@@ -170,7 +168,7 @@
 
         # send rmr (best effort)
         body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance)
-        a1rmr.send(json.dumps(body), message_type=policy_type_id)
+        a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id})
 
         return "", 202
 
@@ -190,7 +188,7 @@
 
         # send rmr (best effort)
         body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id)
-        a1rmr.send(json.dumps(body), message_type=policy_type_id)
+        a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id})
 
         return "", 202
 
diff --git a/a1/data.py b/a1/data.py
index 631d2c6..135d853 100644
--- a/a1/data.py
+++ b/a1/data.py
@@ -23,11 +23,9 @@
 For now, the database is in memory.
 We use dict data structures (KV) with the expectation of having to move this into Redis
 """
-import json
 import msgpack
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
 from a1 import get_module_logger
-from a1 import a1rmr
 
 logger = get_module_logger(__name__)
 
@@ -132,36 +130,9 @@
 
 def _clean_up_type(policy_type_id):
     """
-    pop through a1s mailbox, updating a1s db of all policy statuses
     for all instances of type, see if it can be deleted
     """
     type_is_valid(policy_type_id)
-    for msg in a1rmr.dequeue_all_waiting_messages([21024]):
-        # try to parse the messages as responses. Drop those that are malformed
-        pay = json.loads(msg["payload"])
-        if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
-            # We don't use the parameters "policy_type_id, policy_instance" from above here,
-            # because we are popping the whole mailbox, which might include other statuses
-            pti = pay["policy_type_id"]
-            pii = pay["policy_instance_id"]
-
-            try:
-                """
-                can't raise an exception here e.g.:
-                because this is called on many functions; just drop bad status messages.
-                We def don't want bad messages that happen to hit a1s mailbox to blow up anything
-
-                """
-                type_is_valid(pti)
-                instance_is_valid(pti, pii)
-                SDL.set(_generate_handler_key(pti, pii, pay["handler_id"]), pay["status"])
-            except (PolicyTypeNotFound, PolicyInstanceNotFound):
-                pass
-
-        else:
-            logger.debug("Dropping message")
-            logger.debug(pay)
-
     for policy_instance_id in _get_instance_list(policy_type_id):
         # see if we can delete
         vector = _get_statuses(policy_type_id, policy_instance_id)
@@ -286,3 +257,16 @@
     """
     _clean_up_type(policy_type_id)
     return _get_instance_list(policy_type_id)
+
+
+# Statuses
+
+
+def set_status(policy_type_id, policy_instance_id, handler_id, status):
+    """
+    update the database status for a handler
+    called from a1's rmr thread
+    """
+    type_is_valid(policy_type_id)
+    instance_is_valid(policy_type_id, policy_instance_id)
+    SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
diff --git a/a1/run.py b/a1/run.py
index 9eb59f5..67f63d6 100644
--- a/a1/run.py
+++ b/a1/run.py
@@ -1,3 +1,6 @@
+"""
+A1 entrypoint
+"""
 # ==================================================================================
 #       Copyright (c) 2019 Nokia
 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
@@ -14,18 +17,36 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
+import time
+from threading import Thread
 from gevent.pywsgi import WSGIServer
 from a1 import get_module_logger, app
-from a1.a1rmr import init_rmr
+from a1 import a1rmr
 
 
 logger = get_module_logger(__name__)
 
 
+def start_rmr_thread(real_init=True):
+    """
+    Start a1s rmr thread
+    Also called during unit testing
+    """
+    rmr_loop = a1rmr.RmrLoop(real_init)
+    thread = Thread(target=rmr_loop.loop)
+    thread.start()
+    while not rmr_loop.rmr_is_ready():
+        time.sleep(0.5)
+    return rmr_loop  # return the handle; useful during unit testing
+
+
 def main():
     """Entrypoint"""
-    logger.debug("Initializing rmr")
-    init_rmr()
+    # start rmr thread
+    logger.debug("Initializing rmr thread. A1s webserver will not start until rmr initialization is complete.")
+    start_rmr_thread()
+
+    # start webserver
     logger.debug("Starting gevent server")
     http_server = WSGIServer(("", 10000), app)
     http_server.serve_forever()
diff --git a/container-tag.yaml b/container-tag.yaml
index 4072098..efad040 100644
--- a/container-tag.yaml
+++ b/container-tag.yaml
@@ -1,4 +1,4 @@
 # The Jenkins job uses this string for the tag in the image name
 # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
 ---
-tag: 1.0.1
+tag: 1.0.2
diff --git a/docs/release-notes.rst b/docs/release-notes.rst
index 2060a2a..a4af2b6 100644
--- a/docs/release-notes.rst
+++ b/docs/release-notes.rst
@@ -29,7 +29,20 @@
 
     * Represents a resillent version of 1.0.0 that uses Redis for persistence
 
-[1.0.1]
+[1.0.2] - 10/17/2019
+
+::
+
+    * a1 now has a seperate, continuous polling thread
+      this will enable operations like database cleanup (based on ACKs) and external notifications in real time,
+      rather than when the API is invoked
+    * all rmr send and receive operations are now in this thread
+    * introduces a thread safe job queue between the two threads
+    * Not done yet: database cleanups in the thread
+    * Bump rmr python version
+    * Clean up some logging
+
+[1.0.1] - 10/15/2019
 
 ::
 
diff --git a/integration_tests/a1mediator/Chart.yaml b/integration_tests/a1mediator/Chart.yaml
index 792818d..58e1e1f 100644
--- a/integration_tests/a1mediator/Chart.yaml
+++ b/integration_tests/a1mediator/Chart.yaml
@@ -1,4 +1,4 @@
 apiVersion: v1
 description: A1 Helm chart for Kubernetes
 name: a1mediator
-version: 1.0.1
+version: 1.0.2
diff --git a/setup.py b/setup.py
index 9c242be..9894fb8 100644
--- a/setup.py
+++ b/setup.py
@@ -18,13 +18,13 @@
 
 setup(
     name="a1",
-    version="1.0.1",
+    version="1.0.2",
     packages=find_packages(exclude=["tests.*", "tests"]),
     author="Tommy Carpenter",
     description="RIC A1 Mediator for policy/intent changes",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
     entry_points={"console_scripts": ["run.py=a1.run:main"]},
     # we require jsonschema, should be in that list, but connexion already requires a specific version of it
-    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.2"],
+    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.3"],
     package_data={"a1": ["openapi.yaml"]},
 )
diff --git a/tests/conftest.py b/tests/conftest.py
index c39b962..73573d0 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,8 +1,49 @@
+"""
+pytest conftest
+"""
+# ==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+import tempfile
+import os
 import pytest
+from a1 import app
+
+
+@pytest.fixture
+def client():
+    """
+    http://flask.pocoo.org/docs/1.0/testing/
+    """
+
+    db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
+    app.app.config["TESTING"] = True
+    cl = app.app.test_client()
+
+    yield cl
+
+    os.close(db_fd)
+    os.unlink(app.app.config["DATABASE"])
 
 
 @pytest.fixture
 def adm_type_good():
+    """
+    represents a good put for adm control type
+    """
     return {
         "name": "Admission Control",
         "description": "various parameters to control admission of dual connection",
@@ -41,4 +82,7 @@
 
 @pytest.fixture
 def adm_instance_good():
+    """
+    represents a good put for adm control instance
+    """
     return {"enforce": True, "window_length": 10, "blocking_rate": 20, "trigger_threshold": 10}
diff --git a/tests/test_controller.py b/tests/test_controller.py
index f4e0ae2..e5c3ac7 100644
--- a/tests/test_controller.py
+++ b/tests/test_controller.py
@@ -14,12 +14,10 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-import tempfile
-import os
 
+import time
 from rmr.rmr_mocks import rmr_mocks
-from a1 import app
-import pytest
+from a1 import run
 
 
 ADM_CTRL = "admission_control_policy"
@@ -30,23 +28,8 @@
 TEST_TYPE = "/a1-p/policytypes/20001"
 
 
-# http://flask.pocoo.org/docs/1.0/testing/
-@pytest.fixture
-def client():
-    db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
-    app.app.config["TESTING"] = True
-    cl = app.app.test_client()
-
-    yield cl
-
-    os.close(db_fd)
-    os.unlink(app.app.config["DATABASE"])
-
-
-def _fake_dequeue(_filter_type):
-    """
-    for monkeypatching a1rmnr.dequeue_all_messages with a good status
-    """
+def _fake_dequeue(_mrc, _filter_type):
+    """for monkeypatching with a good status"""
     fake_msg = {}
     pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
     fake_msg["payload"] = pay
@@ -54,17 +37,13 @@
     return new_messages
 
 
-def _fake_dequeue_none(_filter_type):
-    """
-    for monkeypatching a1rmnr.dequeue_all_messages with no waiting messages
-    """
+def _fake_dequeue_none(_mrc, _filter_type):
+    """for monkeypatching with no waiting messages"""
     return []
 
 
-def _fake_dequeue_deleted(_filter_type):
-    """
-    for monkeypatching a1rmnr.dequeue_all_messages with a DELETED status
-    """
+def _fake_dequeue_deleted(_mrc, _filter_type):
+    """for monkeypatching  with a DELETED status"""
     new_msgs = []
 
     # insert some that don't exist to make sure nothing blows up
@@ -85,7 +64,8 @@
 
 def _test_put_patch(monkeypatch):
     rmr_mocks.patch_rmr(monkeypatch)
-    monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0))  # good sends for this whole batch
+    # assert that rmr bad states don't cause problems
+    monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
 
     # we need this because free expects a real sbuf
     # TODO: move this into rmr_mocks
@@ -109,12 +89,24 @@
     monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
 
 
+# Module level Hack
+
+
+RMR_THREAD = None
+
+
+def setup_module():
+    """module level setup"""
+    global RMR_THREAD
+    RMR_THREAD = run.start_rmr_thread(real_init=False)
+
+
 # Actual Tests
 
 
 def test_workflow_nothing_there_yet(client, monkeypatch, adm_type_good, adm_instance_good):
     """ test policy put good"""
-
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
     # no type there yet
     res = client.get(ADM_CTRL_TYPE)
     assert res.status_code == 404
@@ -125,12 +117,15 @@
     assert res.json == []
 
     # instance 404 because type not there yet
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
     res = client.get(ADM_CTRL_POLICIES)
     assert res.status_code == 404
 
 
 def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
+    """
+    test a full A1 workflow
+    """
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
     # put the type
     res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
     assert res.status_code == 201
@@ -148,7 +143,6 @@
     assert res.json == [20000]
 
     # instance 200 but empty list
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
     res = client.get(ADM_CTRL_POLICIES)
     assert res.status_code == 200
     assert res.json == []
@@ -161,8 +155,6 @@
 
     # create a good instance
     _test_put_patch(monkeypatch)
-    # assert that rmr bad states don't cause problems
-    monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
     res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
     assert res.status_code == 202
 
@@ -186,12 +178,13 @@
         assert res.status_code == 200
         assert res.get_data(as_text=True) == expected
 
-    # try a status get but pretend we didn't get any ACKs yet to test NOT IN EFFECT
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
+    # try a status get but we didn't get any ACKs yet to test NOT IN EFFECT
+    time.sleep(1)  # wait for the rmr thread
     get_instance_good("NOT IN EFFECT")
 
     # now pretend we did get a good ACK
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+    time.sleep(1)  # wait for the rmr thread
     get_instance_good("IN EFFECT")
 
     # cant delete type until there are no instances
@@ -205,11 +198,13 @@
     assert res.status_code == 202
 
     # status after a delete, but there are no messages yet, should still return
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+    time.sleep(1)  # wait for the rmr thread
     get_instance_good("IN EFFECT")
 
     # now pretend we deleted successfully
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_deleted)
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_deleted)
+    time.sleep(1)  # wait for the rmr thread
     res = client.get(ADM_CTRL_INSTANCE_STATUS)  # cant get status
     assert res.status_code == 404
     res = client.get(ADM_CTRL_INSTANCE)  # cant get instance
@@ -253,7 +248,8 @@
     assert res.status_code == 404
 
     # get a non existent instance
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+    time.sleep(1)
     res = client.get(ADM_CTRL_INSTANCE + "DARKNESS")
     assert res.status_code == 404
 
@@ -278,3 +274,8 @@
     """
     res = client.get("/a1-p/healthcheck")
     assert res.status_code == 200
+
+
+def teardown_module():
+    """module teardown"""
+    RMR_THREAD.stop()
diff --git a/tox.ini b/tox.ini
index 70389b6..8a47c3c 100644
--- a/tox.ini
+++ b/tox.ini
@@ -29,6 +29,7 @@
 
 # Note, before this will work, for the first time on that machine, run ./install_deps.sh
 commands =
+# sometimes the -s flag is helpful; add -s after pytest; which streams the logs as they come in, rather than saving them all for the end of tests
     pytest --junitxml xunit-results.xml --cov a1 --cov-report xml --cov-report term-missing --cov-report html --cov-fail-under=70
     coverage xml -i