Towards a1 1.0.0: rmr improvements
* Upgrade rmr to 1.9.0
* Upgrade rmr-python to 0.13.2
* Use the new helpers module in rmr-python for the rec all functionality
* Switch rmr mode to a multithreaded mode that continuously reads from rmr and populates an internal queue of messages with a deterministic queue size (2048) which is better behavior for A1
* Fix a memory leak (python obj is garbage collected but not the underlying C memory allocation)
Change-Id: I0f9cf7943071d6d58aef9c8c8bd86affe9b9223a
Signed-off-by: Tommy Carpenter <tc677g@att.com>
diff --git a/.gitignore b/.gitignore
index 542e654..9180140 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,8 @@
*.log
NOTES.txt
+docs/*
+rmr/*
+
.pytest_cache/
xunit-results.xml
.DS_Store
diff --git a/Dockerfile b/Dockerfile
index 4d76cff..988cdfd 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -17,7 +17,7 @@
# install a well known working rmr
FROM python:3.7-alpine
RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.8.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
diff --git a/Dockerfile-Unit-Test b/Dockerfile-Unit-Test
index dbaccbe..49df469 100644
--- a/Dockerfile-Unit-Test
+++ b/Dockerfile-Unit-Test
@@ -16,8 +16,8 @@
# ==================================================================================
# install a well known working rmr
FROM python:3.7-alpine
-RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.3.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
diff --git a/a1/a1rmr.py b/a1/a1rmr.py
index abbb84f..51b5694 100644
--- a/a1/a1rmr.py
+++ b/a1/a1rmr.py
@@ -16,7 +16,7 @@
# ==================================================================================
import os
import gevent
-from rmr import rmr
+from rmr import rmr, helpers
from a1 import get_module_logger
logger = get_module_logger(__name__)
@@ -31,7 +31,10 @@
called from run; not called for unit tests
"""
global MRC
- MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
+ # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
+ # internal ring of messages, and receive calls read from that
+ # currently the size is 2048 messages, so this is fine for the foreseeable future
+ MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
while rmr.rmr_ready(MRC) == 0:
gevent.sleep(1)
@@ -44,7 +47,7 @@
If the message is sent successfully, it returns the transactionid
Does nothing otherwise
"""
- # we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
+ # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
sbuf = rmr.rmr_alloc_msg(MRC, 4096)
payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
@@ -68,33 +71,17 @@
if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
# we are good
logger.debug("Message sent successfully!")
+ rmr.rmr_free_msg(sbuf)
return transaction_id
# we failed all RETRY_TIMES
logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
+ rmr.rmr_free_msg(sbuf)
return None
-def dequeue_all_waiting_messages(filter_type=None):
+def dequeue_all_waiting_messages(filter_type=[]):
"""
dequeue all waiting rmr messages from rmr
- We only add messages of type 21024; we drop other "spam";
- see https://rancodev.atlassian.net/wiki/spaces/RICPLT/pages/60784719/RIC+message+types
"""
- new_messages = []
- sbuf = rmr.rmr_alloc_msg(MRC, 4096)
- while True:
- sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0) # set the timeout to 0 so this doesn't block!!
- summary = rmr.message_summary(sbuf)
- if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
- # no new messages
- break
- else:
- if (not filter_type) or (summary["message type"] == filter_type):
- # message is relevent
- new_messages.append(summary)
- else:
- # "spam", do nothing with message, effectively dropped
- logger.debug("A message was received by a1, but it was not desired, droping: %s", summary)
-
- return new_messages
+ return helpers.rmr_rcvall_msgs(MRC, filter_type)
diff --git a/a1/data.py b/a1/data.py
index 51ba044..d701373 100644
--- a/a1/data.py
+++ b/a1/data.py
@@ -23,10 +23,10 @@
For now, the database is in memory.
We use dict data structures (KV) with the expectation of having to move this into Redis
"""
+import json
from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists
from a1 import get_module_logger
from a1 import a1rmr
-import json
logger = get_module_logger(__name__)
@@ -109,7 +109,7 @@
pops a1s waiting mailbox
"""
# pop through a1s mailbox, updating a1s db of all policy statuses
- for msg in a1rmr.dequeue_all_waiting_messages(21024):
+ for msg in a1rmr.dequeue_all_waiting_messages([21024]):
# try to parse the messages as responses. Drop those that are malformed
# NOTE: we don't use the parameters "policy_type_id, policy_instance" from above here,
# because we are popping the whole mailbox, which might include other statuses
diff --git a/container-tag.yaml b/container-tag.yaml
index 2ac2520..3bf180b 100644
--- a/container-tag.yaml
+++ b/container-tag.yaml
@@ -1,4 +1,4 @@
# The Jenkins job uses this string for the tag in the image name
# for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
---
-tag: 0.14.0-NOT_FOR_USE_YET
+tag: 0.14.1-NOT_FOR_USE_YET
diff --git a/docs/release-notes.rst b/docs/release-notes.rst
index dd26d9b..19f660e 100644
--- a/docs/release-notes.rst
+++ b/docs/release-notes.rst
@@ -29,6 +29,17 @@
* Release 1.0.0 will be the Release A version of A1
+[0.14.1] - 10/2/2019
+::
+
+ * Upgrade rmr to 1.9.0
+ * Upgrade rmr-python to 0.13.2
+ * Use the new helpers module in rmr-python for the rec all functionality
+ * Switch rmr mode to a multithreaded mode that continuously reads from rmr and populates an internal queue of messages with a deterministic queue size (2048) which is better behavior for A1
+ * Fix a memory leak (python obj is garbage collected but not the underlying C memory allocation)
+
+
+
[0.14.0] - 10/1/2019
::
diff --git a/install_deps.sh b/install_deps.sh
index 9ab8cae..01c45b4 100755
--- a/install_deps.sh
+++ b/install_deps.sh
@@ -1,5 +1,5 @@
#!/bin/sh
-git clone --branch 1.8.1 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
+git clone --branch 1.9.0 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir .build; cd .build; cmake .. -DPACK_EXTERNALS=1; sudo make install \
&& cd ../.. \
diff --git a/integration_tests/Dockerfile b/integration_tests/Dockerfile
index 19aae4a..3e8c874 100644
--- a/integration_tests/Dockerfile
+++ b/integration_tests/Dockerfile
@@ -16,8 +16,8 @@
# ==================================================================================
# install a well known working rmr
FROM python:3.7-alpine
-RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.8.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
@@ -34,7 +34,7 @@
# Install RMr python bindings
RUN pip install --upgrade pip
-RUN pip install rmr==0.10.8
+RUN pip install rmr==0.13.2
# rmr setups
RUN mkdir -p /opt/route/
diff --git a/integration_tests/a1mediator/Chart.yaml b/integration_tests/a1mediator/Chart.yaml
index 142094b..2247fdd 100644
--- a/integration_tests/a1mediator/Chart.yaml
+++ b/integration_tests/a1mediator/Chart.yaml
@@ -1,4 +1,4 @@
apiVersion: v1
description: A1 Helm chart for Kubernetes
name: a1mediator
-version: 0.14.0
+version: 0.14.1
diff --git a/integration_tests/receiver.py b/integration_tests/receiver.py
index 75294c6..d73dcd9 100644
--- a/integration_tests/receiver.py
+++ b/integration_tests/receiver.py
@@ -27,8 +27,7 @@
DELAY = int(os.environ.get("TEST_RCV_SEC_DELAY", 0))
HANDLER_ID = os.environ.get("HANDLER_ID", "test_receiver")
-# TODO: should these be made constants?
-mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, 0x00)
+mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
while rmr.rmr_ready(mrc) == 0:
time.sleep(1)
diff --git a/setup.py b/setup.py
index 983e130..0aa2b3f 100644
--- a/setup.py
+++ b/setup.py
@@ -18,13 +18,13 @@
setup(
name="a1",
- version="0.14.0",
+ version="0.14.1",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter",
description="RIC A1 Mediator for policy/intent changes",
url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
entry_points={"console_scripts": ["run.py=a1.run:main"]},
# we require jsonschema, should be in that list, but connexion already requires a specific version of it
- install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "rmr>=0.10.8"],
+ install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "rmr>=0.13.2"],
package_data={"a1": ["openapi.yaml"]},
)
diff --git a/tests/test_controller.py b/tests/test_controller.py
index 88d6f70..2866dd8 100644
--- a/tests/test_controller.py
+++ b/tests/test_controller.py
@@ -76,6 +76,13 @@
rmr_mocks.patch_rmr(monkeypatch)
monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0)) # good sends for this whole batch
+ # we need this because free expects a real sbuf
+ # TODO: move this into rmr_mocks
+ def noop(_sbuf):
+ pass
+
+ monkeypatch.setattr("rmr.rmr.rmr_free_msg", noop)
+
# we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
def fake_alloc(_unused, _alsounused):
sbuf = rmr_mocks.Rmr_mbuf_t()
diff --git a/tox-integration.ini b/tox-integration.ini
index b562f86..e39a8cb 100644
--- a/tox-integration.ini
+++ b/tox-integration.ini
@@ -49,9 +49,9 @@
echo "running ab"
# run apache bench
ab -n 100 -c 10 -u putdata -T application/json http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
+commands_post=
# echo "log collection"
# integration_tests/getlogs.sh
-commands_post=
echo "teardown"
helm delete testreceiver
helm del --purge testreceiver