blob: 7479e1a159d1b3aec7bb1c27e65eaebadad56044 [file] [log] [blame]
# ==================================================================================
# Copyright (c) 2019 Nokia
# Copyright (c) 2018-2019 AT&T Intellectual Property.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
from ricxappframe.rmr import rmr
from ricxappframe.rmr.rmr_mocks import rmr_mocks
MRC = None
SIZE = 256
def _partial_dict_comparison(subset_dict, target_dict):
"""
Compares that target_dict[k] == subset_dict[k] for all k <- subset_dict
"""
for k, v in subset_dict.items():
assert k in target_dict
assert target_dict[k] == subset_dict[k]
def test_send_mock(monkeypatch):
"""
tests the send mock
"""
monkeypatch.setattr("ricxappframe.rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(12))
rmr_mocks.patch_rmr(monkeypatch)
sbuf = rmr.rmr_alloc_msg(MRC, SIZE)
rmr.set_payload_and_length("testttt".encode("utf8"), sbuf)
expected = {
"meid": None,
"message source": "localtest:80",
"message state": 0,
"message type": 0,
"message status": "RMR_OK",
"payload": b"testttt",
"payload length": 7,
"payload max size": 4096,
"subscription id": 0,
}
_partial_dict_comparison(expected, rmr.message_summary(sbuf))
# set the mtype
sbuf.contents.mtype = 666
# send it (the fake send sets the state, and touches nothing else)
sbuf = rmr.rmr_send_msg(MRC, sbuf)
expected = {
"meid": None,
"message source": "localtest:80",
"message state": 12,
"message type": 666,
"message status": "RMR_ERR_TIMEOUT",
"payload": None,
"payload length": 7,
"payload max size": 4096,
"subscription id": 0,
}
_partial_dict_comparison(expected, rmr.message_summary(sbuf))
def test_rcv_mock(monkeypatch):
"""
tests the rmr recieve mocking generator
"""
rmr_mocks.patch_rmr(monkeypatch)
sbuf = rmr.rmr_alloc_msg(MRC, SIZE)
# test rcv
monkeypatch.setattr("ricxappframe.rmr.rmr.rmr_rcv_msg", rmr_mocks.rcv_mock_generator({"foo": "bar"}, 666, 0, True))
sbuf = rmr.rmr_rcv_msg(MRC, sbuf)
assert rmr.get_payload(sbuf) == b'{"foo": "bar"}'
assert sbuf.contents.mtype == 666
assert sbuf.contents.state == 0
assert sbuf.contents.len == 14
# test torcv, although the timeout portion is not currently mocked or tested
monkeypatch.setattr("ricxappframe.rmr.rmr.rmr_torcv_msg", rmr_mocks.rcv_mock_generator({"foo": "bar"}, 666, 0, True, 50))
sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 5)
assert rmr.get_payload(sbuf) == b'{"foo": "bar"}'
assert sbuf.contents.mtype == 666
assert sbuf.contents.state == 0
assert sbuf.contents.len == 14
def test_alloc(monkeypatch):
"""
test alloc with all fields set
"""
rmr_mocks.patch_rmr(monkeypatch)
sbuf = rmr.rmr_alloc_msg(
MRC, SIZE, payload=b"foo", gen_transaction_id=True, mtype=5, meid=b"mee", sub_id=234, fixed_transaction_id=b"t" * 32
)
summary = rmr.message_summary(sbuf)
assert summary["payload"] == b"foo"
assert summary["transaction id"] == b"t" * 32
assert summary["message type"] == 5
assert summary["meid"] == b"mee"
assert summary["subscription id"] == 234