blob: 756a0d042abc3c7d9ad49ba6063a5d7eff62d393 [file] [log] [blame]
# ==================================================================================
# Copyright (c) 2019 Nokia
# Copyright (c) 2018-2019 AT&T Intellectual Property.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
import time
from rmr.rmr_mocks import rmr_mocks
from a1 import a1rmr
ADM_CTRL = "admission_control_policy"
ADM_CTRL_POLICIES = "/a1-p/policytypes/20000/policies"
ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL
ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status"
ADM_CTRL_TYPE = "/a1-p/policytypes/20000"
TEST_TYPE = "/a1-p/policytypes/20001"
def _fake_dequeue():
"""for monkeypatching with a good status"""
pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
fake_msg = {"payload": pay}
return [fake_msg]
def _fake_dequeue_none():
"""for monkeypatching with no waiting messages"""
return []
def _fake_dequeue_deleted():
"""for monkeypatching with a DELETED status"""
new_msgs = []
# insert some that don't exist to make sure nothing blows up
pay = b'{"policy_type_id": 20666, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}'
fake_msg = {"payload": pay}
new_msgs.append(fake_msg)
pay = b'{"policy_type_id": 20000, "policy_instance_id": "darkness", "handler_id": "test_receiver", "status": "DELETED"}'
fake_msg = {"payload": pay}
new_msgs.append(fake_msg)
# insert a bad one with a malformed body to make sure we keep going
fake_msg = {"payload": "asdf"}
new_msgs.append(fake_msg)
pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}'
fake_msg = {"payload": pay}
new_msgs.append(fake_msg)
return new_msgs
def _test_put_patch(monkeypatch):
rmr_mocks.patch_rmr(monkeypatch)
# assert that rmr bad states don't cause problems
monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
# we need this because free expects a real sbuf
# TODO: move this into rmr_mocks
def noop(_sbuf):
pass
monkeypatch.setattr("rmr.rmr.rmr_free_msg", noop)
# we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
def fake_alloc(_unused1, _unused2, _unused3, _unused4, _unused5):
sbuf = rmr_mocks.Rmr_mbuf_t()
sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
return sbuf
# we also need to repatch set, since in the send function, we alloc, then set a new transid
def fake_set_transactionid(sbuf):
sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
# Note, we could have just patched summary, but this patches at a "lower level" so is a better test
monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc)
monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
# Module level Hack
def setup_module():
"""module level setup"""
def noop():
pass
# launch the thread with a fake init func and a patched rcv func; we will "repatch" later
a1rmr.start_rmr_thread(init_func_override=noop, rcv_func_override=_fake_dequeue_none)
# Actual Tests
def test_workflow_nothing_there_yet(client):
""" test policy put good"""
# no type there yet
res = client.get(ADM_CTRL_TYPE)
assert res.status_code == 404
# no types at all
res = client.get("/a1-p/policytypes")
assert res.status_code == 200
assert res.json == []
# instance 404 because type not there yet
res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 404
def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
"""
test a full A1 workflow
"""
# put the type
res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
assert res.status_code == 201
# cant replace types
res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
assert res.status_code == 400
# type there now
res = client.get(ADM_CTRL_TYPE)
assert res.status_code == 200
assert res.json == adm_type_good
res = client.get("/a1-p/policytypes")
assert res.status_code == 200
assert res.json == [20000]
# instance 200 but empty list
res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 200
assert res.json == []
# no instance there yet
res = client.get(ADM_CTRL_INSTANCE)
assert res.status_code == 404
res = client.get(ADM_CTRL_INSTANCE_STATUS)
assert res.status_code == 404
# create a good instance
_test_put_patch(monkeypatch)
res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
assert res.status_code == 202
# replace is allowed on instances
res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
assert res.status_code == 202
# instance 200 and in list
res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 200
assert res.json == [ADM_CTRL]
def get_instance_good(expected):
# get the instance
res = client.get(ADM_CTRL_INSTANCE)
assert res.status_code == 200
assert res.json == adm_instance_good
# get the instance status
res = client.get(ADM_CTRL_INSTANCE_STATUS)
assert res.status_code == 200
assert res.get_data(as_text=True) == expected
# try a status get but we didn't get any ACKs yet to test NOT IN EFFECT
time.sleep(1) # wait for the rmr thread
get_instance_good("NOT IN EFFECT")
# now pretend we did get a good ACK
a1rmr.replace_rcv_func(_fake_dequeue)
time.sleep(1) # wait for the rmr thread
get_instance_good("IN EFFECT")
# cant delete type until there are no instances
res = client.delete(ADM_CTRL_TYPE)
assert res.status_code == 400
# delete it
res = client.delete(ADM_CTRL_INSTANCE)
assert res.status_code == 202
res = client.delete(ADM_CTRL_INSTANCE) # should be able to do multiple deletes
assert res.status_code == 202
# status after a delete, but there are no messages yet, should still return
time.sleep(1) # wait for the rmr thread
get_instance_good("IN EFFECT")
# now pretend we deleted successfully
a1rmr.replace_rcv_func(_fake_dequeue_deleted)
time.sleep(1) # wait for the rmr thread
# list still 200 but no instance
res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 200
assert res.json == []
res = client.get(ADM_CTRL_INSTANCE_STATUS) # cant get status
assert res.status_code == 404
res = client.get(ADM_CTRL_INSTANCE) # cant get instance
assert res.status_code == 404
# delete the type
res = client.delete(ADM_CTRL_TYPE)
assert res.status_code == 204
# cant touch this
res = client.get(ADM_CTRL_TYPE)
assert res.status_code == 404
res = client.delete(ADM_CTRL_TYPE)
assert res.status_code == 404
def test_bad_instances(client, monkeypatch, adm_type_good):
"""
test various failure modes
"""
# put the type (needed for some of the tests below)
rmr_mocks.patch_rmr(monkeypatch)
res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
assert res.status_code == 201
# bad body
res = client.put(ADM_CTRL_INSTANCE, json={"not": "expected"})
assert res.status_code == 400
# bad media type
res = client.put(ADM_CTRL_INSTANCE, data="notajson")
assert res.status_code == 415
# delete a non existent instance
res = client.delete(ADM_CTRL_INSTANCE + "DARKNESS")
assert res.status_code == 404
# get a non existent instance
a1rmr.replace_rcv_func(_fake_dequeue)
time.sleep(1)
res = client.get(ADM_CTRL_INSTANCE + "DARKNESS")
assert res.status_code == 404
# delete the type (as cleanup)
res = client.delete(ADM_CTRL_TYPE)
assert res.status_code == 204
def test_illegal_types(client, adm_type_good):
"""
Test illegal types
"""
res = client.put("/a1-p/policytypes/19999", json=adm_type_good)
assert res.status_code == 400
res = client.put("/a1-p/policytypes/21024", json=adm_type_good)
assert res.status_code == 400
def test_healthcheck(client):
"""
test healthcheck
"""
res = client.get("/a1-p/healthcheck")
assert res.status_code == 200
def teardown_module():
"""module teardown"""
a1rmr.stop_rmr_thread()