blob: a503968dff64af498924df82ed09be8180069f14 [file] [log] [blame]
# vim: ts=4 sw=4 expandtab:
# =================================================================================2
# Copyright (c) 2019-2020 Nokia
# Copyright (c) 2018-2020 AT&T Intellectual Property.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
import time
import pytest
from ricxappframe.rmr import rmr, helpers, exceptions
SIZE = 256
MRC_SEND = None
MRC_RCV = None
def setup_module():
"""
test_rmr module setup
"""
global MRC_SEND
MRC_SEND = rmr.rmr_init(b"3562", rmr.RMR_MAX_RCV_BYTES, 0x00)
while rmr.rmr_ready(MRC_SEND) == 0:
time.sleep(1)
global MRC_RCV
MRC_RCV = rmr.rmr_init(b"3563", rmr.RMR_MAX_RCV_BYTES, 0x00)
while rmr.rmr_ready(MRC_RCV) == 0:
time.sleep(1)
global MRC_BUF_RCV
MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, 0x02)
while rmr.rmr_ready(MRC_BUF_RCV) == 0:
time.sleep(1)
def teardown_module():
"""
test rmr module teardown
"""
rmr.rmr_close(MRC_SEND)
rmr.rmr_close(MRC_RCV)
def _assert_new_sbuf(sbuf):
"""
verify the initial state of an alloced message is what we expect
"""
summary = rmr.message_summary(sbuf)
assert summary["payload"] == b""
assert summary["payload length"] == 0
assert summary["subscription id"] == -1
assert summary["transaction id"] == b""
assert summary["message state"] == 0
assert summary["message status"] == "RMR_OK"
assert summary["meid"] == b""
assert summary["errno"] == 0
def test_get_constants(expected_constants):
"""
test getting constants. We don't care what values are returned as those
should be meaningful only to RMR. We do care that all of the constants
which are defined in expected_contents are returned. Further, we don't
consider it to be an error if the returned list has more constants than
what are in our list.
To avoid frustration, this should list all missing keys, not fail on the
first missing key.
"""
errors = 0
econst = expected_constants
rconst = rmr._get_constants()
for key in econst: # test all expected constants
if key not in rconst: # expected value not listed by rmr
errors += 1
print("did not find required constant in list from RMR: %s" % key)
assert errors == 0
def test_get_mapping_dict(expected_states):
"""
test getting mapping string
"""
assert rmr._get_mapping_dict() == expected_states
assert rmr._state_to_status(0) == "RMR_OK"
assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
assert rmr._state_to_status(666) == "UNKNOWN STATE"
def test_meid():
"""
test meid stringification
"""
sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
rmr.rmr_set_meid(sbuf, b"\x01\x02")
assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
assert len(rmr.rmr_get_meid(sbuf)) == 2
rmr.rmr_set_meid(sbuf, b"\x00" * 31)
assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"" # NULL bytes get truncated
rmr.rmr_set_meid(sbuf, b"6" * 31)
assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 31 # string in string out
rmr.rmr_set_meid(sbuf, b"\x01\x02")
assert (
rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
) # Ctypes will chop at first nil, so expect only 2 bytes back
assert len(rmr.rmr_get_meid(sbuf)) == 2
# test that an exception is raised for buffers which are too long
with pytest.raises(exceptions.MeidSizeOutOfRange):
rmr.rmr_set_meid(sbuf, b"8" * 32)
def test_rmr_set_get():
"""
test set functions
"""
sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
_assert_new_sbuf(sbuf)
# test payload
pay = b"\x01\x00\x80"
rmr.set_payload_and_length(pay, sbuf)
summary = rmr.message_summary(sbuf)
assert summary["payload"] == pay
assert summary["payload length"] == 3
# test transid (note we cant test payload because it's randomly gen)
assert summary["transaction id"] == b""
assert len(summary["transaction id"]) == 0
rmr.generate_and_set_transaction_id(sbuf)
summary = rmr.message_summary(sbuf)
assert summary["transaction id"] != b""
assert len(summary["transaction id"]) == 32
# test meid
assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
summary = rmr.message_summary(sbuf)
assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
assert (len(summary["meid"])) == 4
def test_alloc_fancy():
"""test allocation with setting payload, trans, mtype, subid"""
pay = b"yoo\x01\x00\x80"
sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf", sub_id=654321)
summary = rmr.message_summary(sbuf)
assert summary["payload"] == pay
assert summary["payload length"] == 6
assert summary["transaction id"] != b"" # hard to test what it will be, but make sure not empty
assert len(summary["transaction id"]) == 32
assert summary["message state"] == 0
assert summary["message type"] == sbuf.contents.mtype == 14
assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"asdf"
assert sbuf.contents.sub_id == summary["subscription id"] == 654321
def test_alloc_overlapping_flags():
"""test allocation with setting the transaction id"""
sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, gen_transaction_id=True, fixed_transaction_id=b"6" * 32)
summary = rmr.message_summary(sbuf)
assert summary["transaction id"] == b"66666666666666666666666666666666"
def test_rcv_timeout():
"""
test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
"""
sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
summary = rmr.message_summary(sbuf_rcv)
assert summary["message state"] == 12
assert summary["message status"] == "RMR_ERR_TIMEOUT"
def test_send_rcv():
"""
test send and receive
"""
pay = b"\x01\x00\x80"
# send a message
sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
_assert_new_sbuf(sbuf_send)
rmr.set_payload_and_length(pay, sbuf_send)
sbuf_send.contents.mtype = 0
sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
send_summary = rmr.message_summary(sbuf_send)
assert send_summary["message state"] == 0 # if send fails don't attempt receive
assert send_summary["message status"] == "RMR_OK"
time.sleep(0.5)
# receive it in other context
sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
rcv_summary = rmr.message_summary(sbuf_rcv)
assert rcv_summary["message state"] == 0
assert rcv_summary["message status"] == "RMR_OK"
assert rcv_summary["message type"] == 0
assert rcv_summary["payload"] == pay
# send an ACK back
ack_pay = b"message received"
sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv, payload=ack_pay, mtype=6666)
rcv_ack_summary = rmr.message_summary(sbuf_rcv)
# have the sender receive it
sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
send_ack_summary = rmr.message_summary(sbuf_send)
assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
assert send_ack_summary["payload"] == ack_pay
assert send_ack_summary["message type"] == 6666
def test_send_rcv_subid_good():
"""
test send and receive where subid is used for routing
"""
pay = b"\x01\x00\x80"
test_mtype = 46656
test_subid = 777
# send a message
sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, pay, mtype=test_mtype, sub_id=test_subid)
pre_send_summary = rmr.message_summary(sbuf_send)
sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
send_summary = rmr.message_summary(sbuf_send)
# receive it in other context
time.sleep(0.5)
sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, 3)
sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
rcv_summary = rmr.message_summary(sbuf_rcv)
# asserts
assert send_summary["message state"] == rcv_summary["message state"] == 0
assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
assert pre_send_summary["payload"] == rcv_summary["payload"] == pay
assert pre_send_summary["message type"] == rcv_summary["message type"] == test_mtype
assert pre_send_summary["subscription id"] == rcv_summary["subscription id"] == test_subid
def test_send_rcv_subid_bad_subid():
"""
test send and receive where subid is used for routing but nobody recieves this subid
"""
sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46656, sub_id=778)
sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
assert rmr.message_summary(sbuf_send)["message state"] == 2
assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
def test_send_rcv_subid_bad_mtype():
"""
test send and receive where subid is used for routing but nobody recieves this mtype
"""
sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46657, sub_id=777)
sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
assert rmr.message_summary(sbuf_send)["message state"] == 2
assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
"""
Internal function to support test_rcv_all.
Send a burst of messages optionally giving the type, payload
and number to send.
"""
sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE) # seed message buffer
for i in range(num):
payload = bytes(fmt % counter, "UTF-8")
counter += 1
rmr.set_payload_and_length(payload, sbuf_send)
sbuf_send.contents.mtype = mtype
max_retries = 5
while max_retries > 0:
sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
ms = rmr.message_summary(sbuf_send)
if ms["message state"] != 10: # 10 is retry
break
max_retries -= 1
time.sleep(0.75)
assert ms["message state"] == 0
assert max_retries > 0
def test_rcv_all():
"""
test the ability to receive a batch of queued messages.
"""
pay_fmt = "send to ring msg: %d" # dynamic message format with counter
send_burst(MRC_SEND, pay_fmt) # send a bunch of 13 messages that should queue
time.sleep(1) # ensure underlying transport gets cycles to send/receive
bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call
assert len(bundle) == 13
for i, ms in enumerate(bundle):
ms = bundle[i] # validate each summary returned, and ordering preserved
assert ms["message state"] == 0
expected_pay = bytes(pay_fmt % i, "UTF-8")
assert ms["payload"] == expected_pay
send_burst(MRC_SEND, pay_fmt, mtype=1, num=10) # send a second round with msg types 1 and 2 to test filter
send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued
time.sleep(1) # ensure underlying transport gets cycles to send/receive
bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages
assert len(bundle) == 12 # we should only get the second batch of 12 messages
for i, (ms, sbuf) in enumerate(bundle): # test the raw version
test_summary = rmr.message_summary(sbuf)
assert test_summary == ms
assert ms["message state"] == 0 # all should be OK
assert ms["message type"] == 2 # only mtype 2 should have been received
expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter
assert ms["payload"] == expected_pay
rmr.rmr_free_msg(sbuf)
def test_bad_buffer():
"""test that we get a proper exception when the buffer has a null pointer"""
with pytest.raises(exceptions.BadBufferAllocation):
rmr.rmr_alloc_msg(None, 4096)
def test_resize_payload():
"""test the ability to insert a larger payload into an existing message"""
mtype = 99
subid = 100
mbuf = rmr.rmr_alloc_msg(MRC_SEND, 25) # allocate buffer with small payload
mbuf.contents.mtype = mtype # type and sub-id should not change
mbuf.contents.sub_id = subid
long_payload = b"This is a long payload that should force the message buffer to be reallocated"
rmr.set_payload_and_length(long_payload, mbuf)
summary = rmr.message_summary(mbuf)
assert summary["payload max size"] >= len(long_payload) # RMR may allocate a larger payload space
assert summary["payload length"] == len(long_payload) # however, the length must be exactly the same
assert summary["message type"] == mtype # both mtype and sub-id should be preserved in new
assert summary["subscription id"] == subid